/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.mrunit.mock;


import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mrunit.types.Pair;
import org.apache.hadoop.util.ReflectionUtils;


/**
 * OutputCollector to use in the test framework for Mapper and Reducer
 * classes. Accepts a set of output (k, v) pairs and returns them to the
 * framework for validation.
 */
public class MockOutputCollector<K, V> implements OutputCollector<K, V> {

  private ArrayList<Pair<K, V>> collectedOutputs;
  private SerializationFactory serializationFactory;
  private DataOutputBuffer outBuffer;
  private DataInputBuffer inBuffer;
  private Configuration conf;


  public MockOutputCollector() {
    collectedOutputs = new ArrayList<Pair<K, V>>();

    outBuffer = new DataOutputBuffer();
    inBuffer = new DataInputBuffer();

    conf = new Configuration();
    serializationFactory = new SerializationFactory(conf);
  }


  private Object getInstance(Class klazz) {
    return ReflectionUtils.newInstance(klazz, conf);
  }


  private Object deepCopy(Object obj) throws IOException {

    if (null == obj) {
      return null;
    }

    Class klazz = obj.getClass();
    Object out = getInstance(klazz); // the output object to return.
    Serializer s = serializationFactory.getSerializer(klazz);
    Deserializer ds = serializationFactory.getDeserializer(klazz);

    try {
      s.open(outBuffer);
      ds.open(inBuffer);

      outBuffer.reset();
      s.serialize(obj);

      byte [] data = outBuffer.getData();
      int len = outBuffer.getLength();
      inBuffer.reset(data, len);

      out = ds.deserialize(out);

      return out;
    } finally {
      try {
        s.close();
      } catch (IOException ioe) {
        // ignore this; we're closing.
      }

      try {
        ds.close();
      } catch (IOException ioe) {
        // ignore this; we're closing.
      }
    }
  }

  /**
   * Accepts another (key, value) pair as an output of this mapper/reducer.
   */
  public void collect(K key, V value) throws IOException {
    collectedOutputs.add(new Pair<K, V>((K) deepCopy(key), (V) deepCopy(value)));
  }

  /**
   * @return The outputs generated by the mapper/reducer being tested
   */
  public List<Pair<K, V>> getOutputs() {
    return collectedOutputs;
  }
}

